A good overview: https://cran.r-project.org/web/packages/tsfeatures/vignettes/tsfeatures.html
List of public time series datasets: https://github.com/awesomedata/awesome-public-datasets#timeseries
Don't forget to cite the UCR archive:
@misc{UCRArchive2018,
title = {The UCR Time Series Classification Archive},
author = {Dau, Hoang Anh and Keogh, Eamonn and Kamgar, Kaveh and Yeh, Chin-Chia Michael and Zhu, Yan
and Gharghabi, Shaghayegh and Ratanamahatana, Chotirat Ann and Yanping and Hu, Bing
and Begum, Nurjahan and Bagnall, Anthony and Mueen, Abdullah and Batista, Gustavo},
year = {2018},
month = {October},
note = {\url{https://www.cs.ucr.edu/~eamonn/time_series_data_2018/}}
}
%reload_ext autoreload
%autoreload 2
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn import preprocessing
from time_series_characteristics import TimeSeriesCharacteristics, NormalizedTimeSeriesCharacteristics
import util
Specify the following two variables:
UCR_ARCHIVE: The path to the UCR archive main directory (contains the dataset folders).UCR_DATASET: The name of the UCR dataset to load. Per default, the TEST-dataset will be loaded.UCR_ARCHIVE = INSERT_PATH_TO_UCR_MAIN_DIR_HERE
UCR_DATASET = "Rock"
def read_ucr_file(path):
"""Read the UCR file from the specified path into a DataFrame with a multi-index ('entity', 'time') and a single column 'value'"""
df = pd.read_csv(path, sep="\t", header=None, dtype=np.float64)
entity_index = [f"{os.path.split(path)[1]}_{i}" for i in df.index]
df.index = pd.MultiIndex.from_tuples([(i,) for i in entity_index], names=["entity"])
x, y = df.iloc[:, 1:].copy(), df.iloc[:, 0].astype(np.int32, copy=False)
x[:] = preprocessing.scale(x, axis=1)
# add entity "index" which is just a counter for ucr
x["entity"] = entity_index
# un-pivot the table and use the column-wise timestamps as an index instead
x = pd.melt(x, id_vars=["entity"], value_name="value", var_name="time")
# transform int-based time values to datetime (-1 because pandas labelled the first data column with 1 instead of 0)
x["time"] = pd.to_datetime(x["time"] - 1, unit="m")
# set new multi-index
x.set_index(["entity", "time"], inplace=True, verify_integrity=True)
x.sort_index(inplace=True)
return x, y
def calculate_min_max_df(standardized_df, features_to_normalize, quantiles, index_levels, column_value, verbose):
"""Given a standardized DataFrame, calculate the quantiles for the specified features"""
features_to_normalize_df = util.get_feature_combination_df(
features_to_normalize,
standardized_df,
index_levels=index_levels,
column_value=column_value,
verbose=verbose)
min_max_df = features_to_normalize_df.quantile(quantiles).T
min_max_df.columns = [str(c) for c in min_max_df.columns]
return min_max_df
def random_series(dtindex=False, plot=True, ax=None, random_state=None):
"""Get a random time series from the archive"""
rnd = np.random.RandomState(random_state)
random_entity = rnd.choice(DATA.index.get_level_values("entity"))
sample = DATA.loc[random_entity]["value"].values
if plot:
noaxis = ax is None
if noaxis:
plt.figure(figsize=(4, 1))
ax = plt.gca()
ax.plot(sample)
if noaxis:
plt.show()
if dtindex:
sample = pd.Series(sample, index=pd.to_datetime(list(range(len(sample))), unit="m"), name="value").asfreq("1min")
return sample
def apply(fun, nplots=10, extra=None, dtindex=False, **kwargs):
"""Apply function fun on a random sample of 10 time series and plot them along with their statistics"""
_, axs = plt.subplots(1, nplots, figsize=(30, 1))
for i, ax in enumerate(axs):
ts = random_series(ax=ax, dtindex=dtindex, random_state=i)
val = fun(ts, **kwargs)
if extra is not None:
val = extra(val)
ax.title.set_text("{:.5G}".format(val) if not isinstance(val, str) else val)
def list_format(e):
"""Can be used to format list-like return values in apply"""
return "[" + ", ".join(["{:.5G}".format(f) for f in e]) + "]"
def list_format_tuple(e):
"""Can be used to format list-like return values in apply"""
return "[" + ", ".join(["{:.5G}".format(f[1]) for f in e]) + "]"
def list_format_dict_values(e):
"""Can be used to format list-like return values in apply"""
return "[" + ", ".join(["{:.5G}".format(f) for f in e.values()]) + "]"
DATA = pd.read_csv(os.path.join(UCR_ARCHIVE, UCR_DATASET, f"{UCR_DATASET}_TEST.tsv"), sep="\t", header=None, index_col=0)
DATA, LABELS = read_ucr_file(os.path.join(UCR_ARCHIVE, UCR_DATASET, f"{UCR_DATASET}_TEST.tsv"))
Specify which normalized features and which parameters you want to investigate. This step is only required if normalized features are required (via NormalizedTimeSeriesCharacteristics).
params: The parameters of the features you want to normalize for investigation.features_to_normalize: The features (including the above parameters) you want to normalize for investigation.funcs_to_merge: The group of parameterized features that should be normalized in bulk, rather than individually.tsc = TimeSeriesCharacteristics()
params = dict(
block_sizes=[30, 60],
quantiles=[0.25, 0.5, 0.75],
periods=["1h", "2h"],
periodogram_agg_funcs=["min", ("quantile", dict(q=0.25)), "median", ("quantile", dict(q=0.75)), "max"],
lags=[1, 2, 3]
)
features_to_normalize = [
# distributional features
tsc.kurtosis,
tsc.skewness,
tsc.shift,
*[(tsc.lumpiness, dict(block_size=b)) for b in params["block_sizes"]],
*[(tsc.quantile, dict(q=q)) for q in params["quantiles"]],
tsc.ratio_large_standard_deviation,
# temporal features
tsc.mean_second_derivative_central,
*[(tsc.level_shift, dict(block_size=b)) for b in params["block_sizes"]],
*[(tsc.variance_change, dict(block_size=b)) for b in params["block_sizes"]],
(tsc.periodicity, dict(dt_min=1.0, periods=params["periods"])),
(tsc.agg_periodogram, dict(dt_min=1.0, funcs=params["periodogram_agg_funcs"])),
*[(tsc.time_reversal_asymmetry_statistic, dict(lag=lag)) for lag in params["lags"]],
tsc.linear_trend_slope,
(tsc.agg_linear_trend_slope, dict(block_sizes=params["block_sizes"])),
*[(tsc.c3, dict(lag=lag)) for lag in params["lags"]],
# complexity features
*[(tsc.kullback_leibler_score, dict(block_size=b)) for b in params["block_sizes"]],
tsc.cid_ce
]
funcs_to_merge = [
"quantile",
"periodicity",
"agg_periodogram",
"time_reversal_asymmetry_statistic",
"c3"
]
ntsc = NormalizedTimeSeriesCharacteristics()
min_max_df = calculate_min_max_df(DATA, features_to_normalize, quantiles=[0.05, 0.95], index_levels=["entity"], column_value="value", verbose=True)
ntsc.init(features=features_to_normalize, min_max_df=min_max_df, funcs_to_merge=funcs_to_merge, column_min="0.05", column_max="0.95")
Distributional features do not depend upon the temporal structure of the data, i.e., the view the time series as a set of unordered values.
apply(tsc.kurtosis)
apply(ntsc.kurtosis)
apply(tsc.skewness)
apply(ntsc.skewness)
apply(tsc.shift)
apply(ntsc.shift)
apply(tsc.lumpiness, block_size=params["block_sizes"][0])
apply(ntsc.lumpiness, block_size=params["block_sizes"][0])
apply(tsc.stability, block_size=params["block_sizes"][0])
apply(ntsc.stability, block_size=params["block_sizes"][0])
len(different values occurring more than once) / len(different values)# of data points occurring more than once / # values# unique values / # valuesapply(tsc.normalized_duplicates_max)
apply(ntsc.normalized_duplicates_max)
apply(tsc.normalized_duplicates_min)
apply(ntsc.normalized_duplicates_min)
apply(tsc.percentage_of_reoccurring_datapoints)
apply(ntsc.percentage_of_reoccurring_datapoints)
apply(tsc.percentage_of_reoccurring_values)
apply(ntsc.percentage_of_reoccurring_values)
apply(tsc.percentage_of_unique_values)
apply(ntsc.percentage_of_unique_values)
for q in params["quantiles"]:
print(f"quantile q = {q:.2f}")
apply(tsc.quantile, q=q)
apply(ntsc.quantile, q=q)
plt.show()
for r in np.arange(1, 4):
print(f"ratio beyond r = {r:.2f} x sigma")
apply(tsc.ratio_beyond_r_sigma, r=r)
apply(ntsc.ratio_beyond_r_sigma, r=r)
plt.show()
apply(tsc.ratio_large_standard_deviation)
apply(ntsc.ratio_large_standard_deviation)
Temporal features take into account the temporal dependency of data points, i.e., they observe the frequency spectrum, seasonalitites, correlations with the time axis, ...
apply(tsc.mean_abs_change)
apply(ntsc.mean_abs_change)
apply(tsc.mean_second_derivative_central)
apply(ntsc.mean_second_derivative_central)
apply(tsc.level_shift, block_size=params["block_sizes"][0])
apply(ntsc.level_shift, block_size=params["block_sizes"][0])
apply(tsc.variance_change, block_size=params["block_sizes"][0])
apply(ntsc.variance_change, block_size=params["block_sizes"][0])
apply(tsc.hurst)
apply(ntsc.hurst)
for lag in params["lags"]:
print(f"lag = {lag:d}")
apply(tsc.autocorrelation, lag=lag)
apply(ntsc.autocorrelation, lag=lag)
plt.show()
for period in params["periods"]:
print(f"periodicity {period}")
apply(tsc.periodicity, dtindex=True, extra=lambda e: e[0], periods=[period])
apply(ntsc.periodicity, dtindex=True, extra=lambda e: e[0][1], periods=[period])
plt.show()
apply(tsc.agg_periodogram, funcs=params["periodogram_agg_funcs"], nplots=4, dtindex=True, extra=list_format_tuple)
apply(ntsc.agg_periodogram, funcs=params["periodogram_agg_funcs"], nplots=4, dtindex=True, extra=list_format_tuple)
print("slope")
apply(tsc.linear_trend_slope)
apply(ntsc.linear_trend_slope)
plt.show()
print("r^2")
apply(tsc.linear_trend_rvalue2)
apply(ntsc.linear_trend_rvalue2)
plt.show()
print("variance of slopes with chunk size = {}".format(params["block_sizes"]))
apply(tsc.agg_linear_trend_slope, block_sizes=params["block_sizes"], extra=list_format_tuple)
apply(ntsc.agg_linear_trend_slope, block_sizes=params["block_sizes"], extra=list_format_tuple)
plt.show()
print("mean of r^2 values with chunk size = {}".format(params["block_sizes"]))
apply(tsc.agg_linear_trend_rvalue2, block_sizes=params["block_sizes"], extra=list_format_tuple)
apply(ntsc.agg_linear_trend_rvalue2, block_sizes=params["block_sizes"], extra=list_format_tuple)
plt.show()
for lag in params["lags"]:
print(f"lag = {lag:d}")
apply(tsc.c3, lag=lag)
apply(ntsc.c3, lag=lag)
plt.show()
for lag in params["lags"]:
print(f"lag = {lag:d}")
apply(tsc.time_reversal_asymmetry_statistic, lag=lag)
apply(ntsc.time_reversal_asymmetry_statistic, lag=lag)
plt.show()
Complexity Features measure the "randomness" of a time series, its entropy, etc.
apply(tsc.binned_entropy, max_bins=10)
apply(ntsc.binned_entropy, max_bins=10)
apply(tsc.kullback_leibler_score, block_size=params["block_sizes"][0])
apply(ntsc.kullback_leibler_score, block_size=params["block_sizes"][0])
apply(tsc.index_of_kullback_leibler_score, block_size=params["block_sizes"][0])
apply(tsc.index_of_kullback_leibler_score, block_size=params["block_sizes"][0])
apply(tsc.cid_ce)
apply(ntsc.cid_ce)
# not implemented due to NDA
# apply(tsc.permutation_analysis)
# apply(ntsc.permutation_analysis)
apply(tsc.swinging_door_compression_rate, eps=0.1)
apply(ntsc.swinging_door_compression_rate, eps=0.1)
apply(tsc.normalized_crossing_points)
apply(ntsc.normalized_crossing_points)
apply(tsc.normalized_above_mean)
apply(ntsc.normalized_above_mean)
apply(tsc.normalized_below_mean)
apply(ntsc.normalized_below_mean)
apply(tsc.normalized_longest_strike_above_mean)
apply(ntsc.normalized_longest_strike_above_mean)
apply(tsc.normalized_longest_strike_below_mean)
apply(ntsc.normalized_longest_strike_below_mean)
apply(tsc.flat_spots, nplots=2, extra=list_format_dict_values)
apply(ntsc.flat_spots, nplots=2, extra=list_format_dict_values)
apply(tsc.normalized_number_peaks, n=5)
apply(ntsc.normalized_number_peaks, n=5)
apply(tsc.step_changes, window_len=60)
apply(ntsc.step_changes, window_len=60)
Tests on stationarity, unit roots, etc. - https://www.statisticshowto.datasciencecentral.com/stationarity/ - https://www.statisticshowto.datasciencecentral.com/unit-root/ - https://arch.readthedocs.io/en/latest/unitroot/tests.html
apply(tsc.adf)
apply(ntsc.adf)
apply(tsc.kpss)
apply(ntsc.kpss)
abs_energy()¶standard_deviation()¶partial_autocorrelation(...)¶ar_coefficient(...)¶change_quantiles()¶first_location_of_maximum() first_location_of_minimum() last_location_of_maximum() last_location_of_minimum()¶friedrich_coefficients() max_langevin_fixed_point()¶index_mass_quantile()¶range_count() value_count()¶large_standard_deviation()¶length()¶number_crossing_m()¶sum_of_reoccurring_data_points() sum_of_reoccurring_values() sum_values()¶num_duplicates() num_duplicates_max() num_duplicates_min()¶has_duplicate() has_duplicate_max() has_duplicate_min()¶variance_larger_than_standard_deviation()¶mean() median() maximum() minimum()¶mean_change()¶(a[-1] - a[0]) / (len(a) - 1).absolute_sum_of_changes()¶mean_abs_change(), but unnormalizedagg_autocorrelation(x, param)¶spkt_welch_density() fft_coefficient()¶linear_trend_timewise()¶approximate_entropy(x, m, r) / sample_entropy(x)¶binned_entropy(x, max_bins) as an alternative.number_cwt_peaks()¶number_peaks(x, n) as an alternative.🟨 Index of Dispersion is a measure of burstiness (related: Fano Factor, Variance-to-Mean Ratio) - https://en.wikipedia.org/wiki/Index_of_dispersion
code:
def index_of_dispersion(data):
std = fc.standard_deviation(data)
mu = np.mean(data)
return std ** 2 / mu if mu != 0 else np.nan
apply(index_of_dispersion)
ma_coefficients(...))¶|mean - median| / (max - min) - https://en.wikipedia.org/wiki/Symmetric_probability_distribution#Propertiesy and y_per where y_n is the n-th ACF lag of yn -https://www.sciencedirect.com/science/article/pii/S095741741300078Xnormalized_crossing_points()y_i is a local maximum or minimum for its two closest neighboursnormalized_number_peaks(n) with n = 2